/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.segments.relations.merge;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.model.RelationElement;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.AbstractRelationIdentityContext;
import org.unidata.mdm.data.context.AccessRightContext;
import org.unidata.mdm.data.context.MergeDuplicatesContext;
import org.unidata.mdm.data.context.MergeFromRelationRequestContext;
import org.unidata.mdm.data.context.MergeItemContext;
import org.unidata.mdm.data.context.MergeRelationsRequestContext;
import org.unidata.mdm.data.context.MergeToRelationRequestContext;
import org.unidata.mdm.data.context.ReadWriteDataContext;
import org.unidata.mdm.data.context.RecordIdentityContext;
import org.unidata.mdm.data.dao.RelationsDAO;
import org.unidata.mdm.data.dto.MergeRelationDTO;
import org.unidata.mdm.data.dto.MergeRelationsDTO;
import org.unidata.mdm.data.dto.RelationStateDTO;
import org.unidata.mdm.data.exception.DataExceptionIds;
import org.unidata.mdm.data.exception.DataProcessingException;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.impl.CommonRecordsComponent;
import org.unidata.mdm.data.service.impl.CommonRelationsComponent;
import org.unidata.mdm.data.service.impl.MergeRelationsComponent;
import org.unidata.mdm.data.type.data.OriginRelation;
import org.unidata.mdm.data.type.data.RelationType;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.merge.MergeRelationMasterState;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.RelativeDirection;
import org.unidata.mdm.system.context.CommonRequestContext;
import org.unidata.mdm.system.service.ExecutionService;
import org.unidata.mdm.system.type.pipeline.Connector;
import org.unidata.mdm.system.type.pipeline.Pipeline;
import org.unidata.mdm.system.type.pipeline.PipelineInput;
import org.unidata.mdm.system.type.pipeline.fragment.InputFragmentContainer;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Mikhail Mikhailov on Dec 4, 2019
 */
@Component(RelationMergeConnectorExecutor.SEGMENT_ID)
public class RelationMergeConnectorExecutor extends Connector<PipelineInput, MergeRelationsDTO> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RELATIONS_MERGE_CONNECTOR]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".relations.merge.connector.description";
    /**
     * Logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RelationMergeConnectorExecutor.class);
    /**
     * The execution service.
     */
    @Autowired
    private ExecutionService executionService;
    /**
     * The MMS instance.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * The CRC I.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * The CRC II.
     */
    @Autowired
    private CommonRelationsComponent commonRelationsComponent;
    /**
     * The relation merge component.
     */
    @Autowired
    private MergeRelationsComponent mergeRelationsComponent;
    /**
     * Relations vistory DAO.
     */
    @Autowired
    private RelationsDAO relationsDao;
    /**
     * Constructor.
     */
    public RelationMergeConnectorExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public MergeRelationsDTO connect(PipelineInput ctx) {

        InputFragmentContainer target = (InputFragmentContainer) ctx;
        MergeRelationsRequestContext payload = target.fragment(MergeRelationsRequestContext.FRAGMENT_ID);
        if (Objects.isNull(payload)) {
            return null;
        }

        if (ctx instanceof RecordIdentityContext) {
            payload.keys(((RecordIdentityContext) ctx).keys());
        }

        if (ctx instanceof MergeDuplicatesContext) {
            payload.duplicateKeys(((MergeDuplicatesContext<?>) ctx).duplicateKeys());
        }

        if (ctx instanceof ReadWriteDataContext) {
            payload.timestamp(((ReadWriteDataContext<?>) ctx).timestamp());
        }

        payload.setOperationId(((CommonRequestContext) ctx).getOperationId());
        return execute(payload, null);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public MergeRelationsDTO connect(PipelineInput ctx, Pipeline p) {

        InputFragmentContainer target = (InputFragmentContainer) ctx;
        MergeRelationsRequestContext payload = target.fragment(MergeRelationsRequestContext.FRAGMENT_ID);
        if (Objects.isNull(payload)) {
            return null;
        }

        if (ctx instanceof RecordIdentityContext) {
            payload.keys(((RecordIdentityContext) ctx).keys());
        }

        if (ctx instanceof MergeDuplicatesContext) {
            payload.duplicateKeys(((MergeDuplicatesContext<?>) ctx).duplicateKeys());
        }

        if (ctx instanceof ReadWriteDataContext) {
            payload.timestamp(((ReadWriteDataContext<?>) ctx).timestamp());
        }

        payload.setOperationId(((CommonRequestContext) ctx).getOperationId());
        return execute(payload, p);
    }
    /**
     * Does the actual context processing.
     * @param ctx the context
     * @param p the pipeline
     * @return result
     */
    public MergeRelationsDTO execute(@Nonnull MergeRelationsRequestContext ctx, @Nullable Pipeline p) {

        MeasurementPoint.start();
        try {

            // 1. First of all check master keys (the main keys interface is the master key)
            commonRelationsComponent.ensureAndGetFromRecordKeys(ctx);

            // 2. Resolve duplicates, if needed. Return on "no duplicates to process".
            if (CollectionUtils.isEmpty(ctx.duplicateKeys()) && CollectionUtils.isNotEmpty(ctx.getDuplicates())) {

                List<RecordKeys> duplicates = ctx.getDuplicates().stream()
                        .map(commonRecordsComponent::ensureKeys)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList());

                ctx.duplicateKeys(duplicates);
            }

            if (CollectionUtils.isEmpty(ctx.duplicateKeys())) {
                return null;
            }

            // 3. Check input. Return on no input, what is not a crime
            Map<String, List<MergeFromRelationRequestContext>> from = ensureFromInput(ctx);
            Map<String, List<MergeToRelationRequestContext>> to = ensureToInput(ctx);
            if (MapUtils.isEmpty(from) && MapUtils.isEmpty(to)) {
                return null;
            }

            Map<String, List<AbstractRelationIdentityContext>> input = new HashMap<>();
            from.entrySet().forEach(entry -> input.computeIfAbsent(entry.getKey(), k -> new ArrayList<AbstractRelationIdentityContext>())
                           .addAll(entry.getValue()));
            to.entrySet().forEach(entry -> input.computeIfAbsent(entry.getKey(), k -> new ArrayList<AbstractRelationIdentityContext>())
                         .addAll(entry.getValue()));

            // 4. Merge input and execute
            RecordKeys masterKeys = ctx.keys();
            if (Objects.isNull(ctx.timestamp())) {
                ctx.timestamp(new Date());
            }

            return new MergeRelationsDTO(masterKeys, executeMerge(input, p, ctx));
        } finally {
            MeasurementPoint.stop();
        }
    }

    private Map<String, List<MergeFromRelationRequestContext>> ensureFromInput(MergeRelationsRequestContext ctx) {

        if (MapUtils.isNotEmpty(ctx.getRelationsFrom())) {
            return ctx.getRelationsFrom();
        } else if (CollectionUtils.isEmpty(ctx.getFromRelationNames()) && !ctx.isApplyToAll()) {
            return Collections.emptyMap();
        }

        Map<String, List<UUID>> relationEtalonIds =
                ctx.duplicateKeys().stream()
                .map(keys -> relationsDao.loadMappedRelationEtalonIds(
                        UUID.fromString(keys.getEtalonKey().getId()),
                            ctx.getFromRelationNames(),
                            RelativeDirection.FROM))
                .filter(MapUtils::isNotEmpty)
                .map(Map::entrySet)
                .flatMap(Collection::stream)
                .collect(Collectors.groupingBy(Entry::getKey,
                         Collectors.flatMapping(entry -> entry.getValue().stream(), Collectors.toList())));

        if (MapUtils.isEmpty(relationEtalonIds)) {
            return Collections.emptyMap();
        }

        Map<String, List<MergeFromRelationRequestContext>> result = new HashMap<>(relationEtalonIds.size());
        relationEtalonIds.forEach((k, v) ->
            result.put(k, v.stream()
                    .map(id ->
                        MergeFromRelationRequestContext.builder()
                            .relationEtalonKey(id.toString())
                            .operationId(ctx.getOperationId())
                            .build())
                    .collect(Collectors.toList())));

        return result;
    }

    private Map<String, List<MergeToRelationRequestContext>> ensureToInput(MergeRelationsRequestContext ctx) {

        if (MapUtils.isNotEmpty(ctx.getRelationsTo())) {
            return ctx.getRelationsTo();
        } else if (CollectionUtils.isEmpty(ctx.getToRelationNames()) && !ctx.isApplyToAll()) {
            return Collections.emptyMap();
        }

        Map<String, List<UUID>> relationEtalonIds =
                ctx.duplicateKeys().stream()
                .map(keys -> relationsDao.loadMappedRelationEtalonIds(
                        UUID.fromString(keys.getEtalonKey().getId()),
                            ctx.getToRelationNames(),
                            RelativeDirection.TO))
                .filter(MapUtils::isNotEmpty)
                .map(Map::entrySet)
                .flatMap(Collection::stream)
                .collect(Collectors.groupingBy(Entry::getKey,
                         Collectors.flatMapping(entry -> entry.getValue().stream(), Collectors.toList())));

        if (MapUtils.isEmpty(relationEtalonIds)) {
            return Collections.emptyMap();
        }

        Map<String, List<MergeToRelationRequestContext>> result = new HashMap<>(relationEtalonIds.size());
        relationEtalonIds.forEach((k, v) ->
            result.put(k, v.stream()
                    .map(id ->
                        MergeToRelationRequestContext.builder()
                            .relationEtalonKey(id.toString())
                            .operationId(ctx.getOperationId())
                            .build())
                    .collect(Collectors.toList())));

        return result;
    }

    @SuppressWarnings("unchecked")
    private Map<RelationStateDTO, List<MergeRelationDTO>> executeMerge(
            Map<String, List<AbstractRelationIdentityContext>> input,
            Pipeline p,
            MergeRelationsRequestContext ctx) {

        // 0. Set up vars
        RecordKeys masterKeys = ctx.keys();
        Date timestamp = ctx.timestamp();

        // 1. Init state
        MergeRelationMasterState mergeState = ensureMergeMasterState(masterKeys, ctx);

        // 2. Execute
        Map<RelationStateDTO, List<MergeRelationDTO>> result = new HashMap<>();
        for (Entry<String, List<AbstractRelationIdentityContext>> entry : input.entrySet()) {

            if (CollectionUtils.isEmpty(entry.getValue())) {
                continue;
            }

            // 2.1 Check rel's existance. Fail if not found
            final RelationElement relation = ensureRelationDefinition(entry.getKey());

            // 2.2 Set up content and run single gets
            final String resolvedName = relation.getName();
            final RelationType resolvedType = RelationType.fromModel(relation);

            RelationStateDTO state = new RelationStateDTO(resolvedName, resolvedType);
            List<MergeRelationDTO> collected = new ArrayList<>(entry.getValue().size());
            for (AbstractRelationIdentityContext mCtx : entry.getValue()) {

                String entityName = masterKeys != null ? masterKeys.getEntityName() : relation.getLeft().getName();

                ((AccessRightContext) mCtx).accessRight(SecurityUtils.getRightsForResourceWithDefault(entityName));
                ((MergeItemContext<MergeRelationMasterState>) mCtx).masterKeys(masterKeys);
                ((MergeItemContext<MergeRelationMasterState>) mCtx).masterState(mergeState);
                ((ReadWriteDataContext<OriginRelation>) mCtx).timestamp(timestamp);
                mCtx.relationName(resolvedName);
                mCtx.relationType(resolvedType);
                mCtx.setOperationId(ctx.getOperationId());

                MergeRelationDTO interim;
                if (Objects.isNull(p)) {
                    interim = executionService.execute((PipelineInput) mCtx);
                } else {
                    interim = executionService.execute(p, (PipelineInput) mCtx);
                }

                if (Objects.nonNull(interim)) {
                    collected.add(interim);
                }
            }

            result.put(state, collected);
        }

        return result;
    }

    private RelationElement ensureRelationDefinition(String name) {

        RelationElement relation = metaModelService.instance(Descriptors.DATA).getRelation(name);
        if (relation == null) {
            final String message = "Relation [{}] not found. Stopping.";
            LOGGER.warn(message, name);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RELATIONS_GET_RELATION_NOT_FOUND, name);
        }

        return relation;
    }

    private MergeRelationMasterState ensureMergeMasterState(RecordKeys masterKeys, MergeRelationsRequestContext ctx) {

        MergeRelationMasterState mergeState;
        if (MapUtils.isNotEmpty(ctx.getRelationsFrom()) || MapUtils.isNotEmpty(ctx.getRelationsTo())) {
            mergeState = mergeRelationsComponent.getMasterState(masterKeys, ctx.getRelationsFrom().keySet(), ctx.getRelationsTo().keySet());
        } else if (CollectionUtils.isNotEmpty(ctx.getFromRelationNames()) || CollectionUtils.isNotEmpty(ctx.getToRelationNames())) {
            mergeState = mergeRelationsComponent.getMasterState(masterKeys, ctx.getFromRelationNames(), ctx.getToRelationNames());
        } else {
            mergeState = mergeRelationsComponent.getMasterState(masterKeys);
        }

        return mergeState;
    }
}
